import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;



//行与行之间打散,统计每个单词的最新数量
//输入的类型是Tuple2<String, Integer>>
//输出的类型是Tuple2<String, Integer>>
//输入的是一行数据的统计结果
//输出的是多行数据的统计结果
public class RichFlatMapFunction1 extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

private transient ValueState<Tuple2<Integer, Integer>> countSumValueState;

@Override
public void open(Configuration parameters) throws Exception
{

//    valueStateDescriptor可以大致理解为指针
        // 初始化ValueState
//    这个 其实是个文件描述符(因为state在hdfs中其实对应一个缓存文件)
//    所以这里可以理解为是在读写一个文件
        ValueStateDescriptor<Tuple2<Integer, Integer>> countSumValueStateDesc = new ValueStateDescriptor("countSumValueState",
        TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})
        );
        countSumValueState = getRuntimeContext().getState(countSumValueStateDesc);
        }


//     输入 Tuple2<String,Integer>
//     返回 Collector<Tuple2<String,String>>
        @Override
        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        if(countSumValueState.value() == null)
        {countSumValueState.update(Tuple2.of(0, 0)); }

        Integer count = countSumValueState.value().f0;
        count++; //这个应该是行数
        Integer valueSum = countSumValueState.value().f1;
        valueSum += value.f1;

        countSumValueState.update(Tuple2.of(count, valueSum));

        // 每当达到3次，发送到下游
        if(count > 3) {
        out.collect(Tuple2.of(value.f0, valueSum));//这个是返回的结果

        // 清除计数
        countSumValueState.update(Tuple2.of(0, valueSum));//这里的0代表清空"统计的行数"
        }
        }
        }